package com.flow.framework.schedule.system.checker.impl;

import com.flow.framework.common.error.SystemErrorCode;
import com.flow.framework.common.exception.CheckedException;
import com.flow.framework.common.health.ServiceHealthCheckCode;
import com.flow.framework.common.util.io.IoUtil;
import com.flow.framework.common.util.verify.VerifyUtil;
import com.flow.framework.core.system.checker.AbstractSystemStatusChecker;
import com.flow.framework.schedule.properties.rpc.RpcConfigProperties;
import lombok.extern.slf4j.Slf4j;

import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.URL;
import java.util.*;

/**
 * 调度器网络监控（基于TCP的监控）
 *
 * @author luoguopiao
 * @version 0.0.1
 * @date 2022/4/10
 */
@Slf4j
public class RemoteScheduleNetworkChecker extends AbstractSystemStatusChecker {

    private Map<String, Integer> hostAndPortMap = new HashMap<>();

    public RemoteScheduleNetworkChecker(RpcConfigProperties rpcConfigProperties) {
        String adminAddresses = rpcConfigProperties.getAdminAddresses();
        if (VerifyUtil.isEmpty(adminAddresses)) {
            return;
        }
        String[] addresses = adminAddresses.split(",");
        try {
            for (String address : addresses) {
                URL url = new URL(address);
                String host = url.getHost();
                int port = url.getPort();
                hostAndPortMap.put(host, port);
            }
        } catch (Exception e) {
            log.error("init check schedule error.", e);
            throw new CheckedException(SystemErrorCode.PARAMS_ERROR, "init check schedule error.", e);
        }
    }

    @Override
    public int getServiceHealthCheckCode() {
        return ServiceHealthCheckCode.SERVICE_SCHEDULER_CODE;
    }

    @Override
    protected Set<String> executeAsyncHealthCheck() {
        if (VerifyUtil.isEmpty(hostAndPortMap)) {
            return Collections.emptySet();
        }
        Set<String> unhealthyTags = new HashSet<>();
        Set<Map.Entry<String, Integer>> entries = hostAndPortMap.entrySet();
        for (Map.Entry<String, Integer> entry : entries) {
            String host = entry.getKey();
            int port = entry.getValue();
            Socket socket = new Socket();
            try {
                socket.connect(new InetSocketAddress(host, port), 1000);
            } catch (Exception e) {
                log.error("check remote error, host: {}, port : {}", host, port);
                unhealthyTags.add(host + ":" + port);
            } finally {
                IoUtil.close(socket);
            }
        }
        return unhealthyTags;
    }

    @Override
    public long getInitialDelay() {
        return 0;
    }

    @Override
    public long getPeriod() {
        return 30000;
    }
}
